feat(db): expose tracked source record subscriptions#1500
feat(db): expose tracked source record subscriptions#1500lalitkapoor wants to merge 3 commits intoTanStack:mainfrom
Conversation
Adds a `getTrackedSourceRecords` / `subscribeTrackedSourceRecords` API on both base collections and live-query collections so consumers can observe which source records the IVM pipeline is currently using. Architecture (three scopes): - Per-alias dedup (`sentToD2Keys` in CollectionSubscriber) — net add/remove transitions only, no churn from updates that keep the same key in range. - Per-live-query aggregator — refcounts across aliases (handles self-joins), exposed only while the live query has subscribers. - Per-base-collection manager — refcounts across all live queries that touch the collection. The aggregator propagates net 0↔1 transitions to each source collection's `_trackedSourceRecords` manager, gated on the live query being exposed (via `subscribers:change`). Performance: - O(|batch|) on the hot path — net counting, no O(N) Set copies. - Skips record-object allocation when no listeners are attached. - Truncate path no longer eagerly clears `sentToD2Keys`; the truncate's delete events drain it through `filterDuplicateInserts`, and the merged delete + re-insert batch nets to zero (no spurious churn). Includes regression tests for stable-membership ordered queries, truncate-refetch parity, and lazy-join key dedup. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comments 1, 2, and 3 from kevin-dp:
1. Stale comment in `collection-subscriber.ts` truncate handler — updated
to explain why `sentToD2Keys.clear()` is no longer needed (truncate's
delete events drain it through `filterDuplicateInserts`, and the
merged delete+re-insert batch nets to zero).
2. Shared `utils` mutation footgun — eliminated. `createCollection` is
now byte-identical to upstream (`collection.utils = options.utils ?? {}`,
no in-place mutation). Tracked-source helpers moved off `utils` to
direct Collection methods (`subscribeTrackedSourceRecords`,
`getTrackedSourceRecords`), so the entire shared-utils surface area
for this feature is gone.
3. Reversed merge precedence — restored upstream
`{ ...options.utils, ...config.utils }` (config wins).
Polymorphic view restoration:
The earlier cleanup removed the live-query-local view (the per-query
"records this query reads from its sources"). Restored under the same
method name on `Collection` via polymorphic dispatch:
- Base collection -> `_trackedSourceRecords` manager
("records of mine consumed by live queries")
- Live-query collection -> `_liveQueryTrackedSourceView`
("records this query reads from its sources")
`bridgeToCreateCollection` wires the live-query view at construction
time. `LiveQueryTrackedSourceRecordsAggregator` regains its listener
fan-out; `CollectionConfigBuilder` owns a long-lived listener Set that
survives sync sessions, so external subscribers don't need to re-attach
across start/stop cycles.
Three new tests cover the live-query-local view: snapshot deltas,
includeInitialState replay, and parity with the base-collection view.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
kevin-dp
left a comment
There was a problem hiding this comment.
Hi @lalitkapoor
Thanks for your contribution!
I reviewed your PR and left some comments. I also asked Claude to review and it reported some test gaps, may be worth adding those tests:
Test gap: TrackedSourceRecordsManager.subscribe includeInitialState replay
In every base-collection test, subscribeTrackedSourceRecords is called before subscribeChanges, so the manager is empty when the listener attaches and the initial-state replay path:
// tracked-source-records.ts
if (options?.includeInitialState && this.entries.size > 0) {
callback({ added: this.get(), removed: [] })
}is never executed. The mirror path in liveQueryTrackedSourceView.subscribe is covered (test "should replay the live-query-local snapshot as initial state…"). Worth adding a base-collection counterpart: start a live query, preload, then subscribe to the base collection with includeInitialState: true and assert one immediate added event.
Test gap: nested live queries
The PR description says a live query whose source is another live query "works because _trackedSourceRecords lives on CollectionImpl." That's true mechanically — nothing crashes — but there's no end-to-end test, and the design has an interesting implication: liveQueryB will populate liveQueryA._trackedSourceRecords (the base manager), but liveQueryA.subscribeTrackedSourceRecords will always route through _liveQueryTrackedSourceView and never expose that data. So the base-view of any live-query collection is silently unreachable. Either document that, or skip allocating _trackedSourceRecords when _liveQueryTrackedSourceView is set.
Test gap: truncate-refetch with different keys
The truncate test (should not emit tracked source churn across truncate refetch…) only covers the case where the refetched keys equal the previous keys. The interesting opposite case — refetch returns a different set, which should net to non-zero added/removed — isn't covered. Worth adding to pin the contract that real membership changes do still emit across a truncate cycle.
| // Aggregated view of source-records currently being used by active live | ||
| // queries that depend on this collection. Public so live-query aggregators | ||
| // can push deltas in. | ||
| public _trackedSourceRecords: TrackedSourceRecordsManager<TKey> |
There was a problem hiding this comment.
It's a bit weird to have underscored variables that are exposed publicly. We could replace the direct cross-class access with a method on CollectionImpl (e.g. applyTrackedSourceDelta(added, removed) that delegates internally). Then the field can be private.
| // during construction; undefined on base collections. When present, the | ||
| // public `getTrackedSourceRecords` / `subscribeTrackedSourceRecords` | ||
| // methods route to this view instead of `_trackedSourceRecords`. | ||
| public _liveQueryTrackedSourceView?: { |
There was a problem hiding this comment.
Similarly, it's a bit weird to have this public underscored variable. We could avoid this by passing the view through CollectionConfig (or a CollectionImpl constructor argument) instead of assigning it post-construction. Then the field can be readonly and the assignment site moves into the constructor. The builder already exists at the moment bridgeToCreateCollection calls createCollection, so this is mechanically straightforward — just thread it through CollectionConfig.
| [LIVE_QUERY_INTERNAL]: LiveQueryInternalUtils | ||
| } | ||
|
|
||
| export type LiveQueryCollectionUtils<TUtils extends UtilsRecord = {}> = TUtils & |
There was a problem hiding this comment.
Old: LiveQueryCollectionUtils = UtilsRecord & { built-ins } (string-indexable).
New: LiveQueryCollectionUtils<TUtils = {}> = TUtils & LiveQueryBuiltInUtils.
LiveQueryCollectionUtils (no type arg) is now {} & LiveQueryBuiltInUtils, which lacks the Record<string, any> index signature the old type had. Anyone doing const u: LiveQueryCollectionUtils = ...; u.someCustomKey without specifying TUtils will see a new type error.
| } | ||
|
|
||
| return collection | ||
| return collection as unknown as Collection< |
There was a problem hiding this comment.
Can't we pass these type args to CollectionImpl (in particular UtilsRecord) such that we don't have to do an unsafe typecast here?
| } | ||
| } | ||
|
|
||
| for (const key of removed) { |
There was a problem hiding this comment.
Can it happen that a key wasn't known until now and occurs in both added and removed?
In that case it would be part of both netAdded and netRemoved, is that problematic?
There was a problem hiding this comment.
I asked Claude about this, i agree with its answer and would go for adding built-in defense mechanism.
Not from the current callers, but the manager doesn't enforce it. Tracing upward:
CollectionSubscriber.emitTrackedKeyDeltanets per-key first (Map<key, number>), then partitions by sign — soaddedandremovedit emits are disjoint by construction.LiveQueryTrackedSourceRecordsAggregator.applyis called with that disjoint pair (single subscriber → single call), and its ownnetAdded/netRemovedlikewise can't share a key within one call.TrackedSourceRecordsManager.applyis only called by the aggregator, with the disjointnetAdded/netRemoved.
So end-to-end the precondition holds today. But if it were violated — added and removed both containing a key K that's not in entries — the manager would:
- Process
added:entries.set(K, { refCount: 1 }), push K tonetAdded. - Process
removed:existing.refCount === 1→entries.delete(K), push K tonetRemoved.
Net result: listeners get { added: [K], removed: [K] } for a key that was never actually tracked. A sync layer wired to this would subscribe(K) then immediately unsubscribe(K) for nothing. That is problematic.
Two other overlap shapes are fine: if K already exists with refCount ≥ 1, the add/remove pair leaves refCount unchanged and emits nothing (the if (existing.refCount === 1) guard correctly skips netRemoved when refCount is now > 1). Only the brand-new-key case is buggy.
The aggregator's apply has the identical shape and the same latent fragility.
Two options:
- Document the precondition in a JSDoc on both
applymethods: "added and removed must be disjoint; duplicates within either must be deduplicated by the caller." Cheap, makes the assumption explicit, leaves it to future contributors to honor. - Net defensively — same trick as
emitTrackedKeyDelta:Costs one Map allocation per call but eliminates the entire class of caller mistakes (overlap and duplicates within either array).const net = new Map<TKey, number>() for (const k of added) net.set(k, (net.get(k) ?? 0) + 1) for (const k of removed) net.set(k, (net.get(k) ?? 0) - 1) // then iterate net entries by sign
I'd lean toward netting in the manager (it's the public-API boundary — callers reach through _trackedSourceRecords from outside the class, so the contract is harder to enforce by code review), and just documenting the precondition on the aggregator (private-ish, single internal caller).
More templates
@tanstack/angular-db
@tanstack/browser-db-sqlite-persistence
@tanstack/capacitor-db-sqlite-persistence
@tanstack/cloudflare-durable-objects-db-sqlite-persistence
@tanstack/db
@tanstack/db-ivm
@tanstack/db-sqlite-persistence-core
@tanstack/electric-db-collection
@tanstack/electron-db-sqlite-persistence
@tanstack/expo-db-sqlite-persistence
@tanstack/node-db-sqlite-persistence
@tanstack/offline-transactions
@tanstack/powersync-db-collection
@tanstack/query-db-collection
@tanstack/react-db
@tanstack/react-native-db-sqlite-persistence
@tanstack/rxdb-db-collection
@tanstack/solid-db
@tanstack/svelte-db
@tanstack/tauri-db-sqlite-persistence
@tanstack/trailbase-db-collection
@tanstack/vue-db
commit: |
Description
Two new APIs let application code observe which source records a live query is consuming, so callers can subscribe/unsubscribe at an underlying sync layer in lockstep with query usage.
What's added
Methods on every
Collection:Same method, polymorphic dispatch:
K, the base view seesKonce; only when the last query stops usingKdoes it emitremoved.Why
Useful for custom sync layers that subscribe/unsubscribe at the record level. They need to subscribe when a record becomes actively used by queries and unsubscribe when the last query stops using it. Base-collection residency is too broad — it covers everything sync'd, whether queries want it or not. These APIs provide the narrower "actively used by live queries" signal.
How an app developer uses this
On a single live query (per-query scope):
On a base collection (union scope across all live queries):
The same pattern works for any collection that uses
CollectionImplunder the hood — electric, query-db, localStorage, sqlite-persisted, etc. The feature is sync-backend-agnostic because tracking happens in the live-query pipeline (the IVM), not in the sync config.Behaviour notes:
includeInitialState: truereplays the current snapshot asaddedon subscribe (if any records are currently tracked and exposed).0 → 1+subscribers (a live query gains its first consumer): the per-query view emitsaddedfor every key the query is reading; the base view emitsaddedfor any key reaching refcount 1.1+ → 0subscribers (last consumer leaves): the per-query view emitsremovedfor every key it was reading; the base view emitsremovedfor any key whose refcount drops to 0.must-refetch/ truncate-refetch cycles that end with the same keys still tracked emit nothing.How this works internally
Three concentric scopes, each with a real owner.
Scope 1 — alias-level:
CollectionSubscriber.sentToD2KeysFor every alias in a live query (e.g.
{ user: usersCollection }), oneCollectionSubscriberis created. It already maintainedsentToD2Keys: Set<string | number>to deduplicate inserts into the D2 dataflow graph — the authoritative per-alias record of "keys this alias is feeding the query."This PR turns that set's mutations into events. On each
sendChangesToPipeline, afterfilterDuplicateInsertshas mutated the set, the subscriber derives a net delta from the filtered change stream itself — every surviving insert is a 0→1 transition for its key, every surviving delete is a 1→0 transition. Insert/delete pairs for the same key within one batch (emitted bysplitUpdatesfor stable-membership ordered updates) net to zero. Cost isO(|batch|)— no dependency on the size ofsentToD2Keys. The result is emitted via a single public callback property assigned 1:1 by the builder.Scope 2 — per-live-query:
LiveQueryTrackedSourceRecordsAggregatorOne aggregator per sync session. Owns a nested refcount map:
Outer key is
collectionId, inner key is the source record's primitivekey— no composite serialization. The refcount is over aliases: a self-join like{ employee: emp, manager: emp }has two subscribers emitting independently, and the aggregator dedupes.apply(collectionId, added, removed):netAdded, 1→0 intonetRemoved. Empty inner buckets are pruned.exposed: propagate net transitions tosourceCollections[collectionId]._trackedSourceRecords.apply(...)(scope 3), and fan out directly to the live-query-local listener set (held by reference from the builder).setExposed(boolean)is the lifecycle gate.false → truereplays the snapshot asadded.true → falsereplays asremoved. The builder wires this tosubscribers:change:Scope 3 — per-base-collection:
TrackedSourceRecordsManagerEach
CollectionImplowns one_trackedSourceRecords: TrackedSourceRecordsManager<TKey>field. Refcount map isMap<TKey, { key, refCount }>over live queries. On 0↔1 transitions, listeners registered viasubscribeseeadded/removed. Record-object allocation is skipped when the listener set is empty.Polymorphic dispatch on
Collection.subscribeTrackedSourceRecordsCollectionImplhas an optional_liveQueryTrackedSourceViewfield. It'sundefinedon every collection except live queries;bridgeToCreateCollectionsets it at construction time to a stable adapter onCollectionConfigBuilder(which routes through whatever aggregator the current sync session has). The public methods onCollectionImpl:So callers don't need to branch on collection type. Same method name, scope determined by what kind of collection you're holding.
The live-query listener set lives on
CollectionConfigBuilder(long-lived). The aggregator holds it by reference. External subscribers attached before the first sync session, or across session start/stop cycles, do not need to re-attach.End-to-end flow:
Performance characteristics
All hot paths are bounded by batch size, with no dependency on the total tracked-set size:
O(|batch|)via insert/delete count netting on the filtered stream. No copy ofsentToD2Keys.O(|batch|)using a nestedMap<collectionId, Map<key, Entry>>— primitive keys, no serialization.O(|batch|)usingMap<TKey, Entry>— primitive keys, no serialization.Why not just expose
CollectionSubscription.subscribedKeys?Two reasons:
sentToD2Keysis per-alias. A live query with multiple aliases on the same collection (self-joins) would require consumers to dedupe. Scope 2 exists precisely to handle that.subscribeChangesbuffers truncate/refetch to hide transient empty states.subscribeTrackedSourceRecordsneeds to emit net membership. Deriving one from the other would mix concerns.Three scopes with clear ownership match the three semantic views application code may want.
What's not changed
The feature is purely additive on
CollectionImpl.Collection.utilsandcreateCollectionare byte-identical to upstream — no in-place mutation of user-supplied utils, no merge-precedence changes. The live-query path's utils merge stays at upstream's{ ...options.utils, ...config.utils }(config wins). Tracked-source helpers are not onutilsat all; they're direct methods onCollectionImpl.How was this change tested?
Regression coverage in
packages/db/tests/query/live-query-collection.test.ts:includeInitialState, plus snapshot-as-removed on last-subscriber unsubscribe.includeInitialStatereplay.Plus a regression test in
packages/db/tests/utility-exposure.test.tspinning that user-suppliedutilskeys override built-ins on collision (the upstream merge-precedence contract).The feature lives entirely on
CollectionImpland the live-query pipeline (CollectionSubscriber→ aggregator → manager) — none of the sync backends are touched, so the in-memory tests exercise the same code paths that electric, query-db, localStorage, etc. would hit.Screenshots